package com.code.ape.codeape.common.elasticsearch.config;

import com.code.ape.codeape.common.elasticsearch.ResetElasticSearchClient;
import com.code.ape.codeape.common.elasticsearch.properties.ElasticSearchProperties;
import io.micrometer.core.instrument.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

import java.util.Arrays;
import java.util.Objects;

/**
 * @author 公众号：码猿技术专栏
 * @url: www.java-family.cn
 * @description  ElasticSearch的自动配置类
 */
@Slf4j
@ConditionalOnProperty(name = "elasticsearch.enabled", matchIfMissing = true)
public class ElasticSearchClientAutoConfig {
	private static final int ADDRESS_LENGTH = 2;
	private static final String HTTP_SCHEME = "http";

	@ConditionalOnMissingBean(value =RestClientBuilder.class )
	@Bean
	public RestClientBuilder restClientBuilder(ElasticSearchProperties elasticSearchProperties) {
		String[] split = elasticSearchProperties.getHosts().split(",");
		HttpHost[] hosts = Arrays.stream(split)
				.map(this::makeHttpHost)
				.filter(Objects::nonNull)
				.toArray(HttpHost[]::new);
		return RestClient.builder(hosts);
	}

	@ConditionalOnMissingBean(value =RestHighLevelClient.class )
	@Bean(name = "highLevelClient")
	public RestHighLevelClient highLevelClient(RestClientBuilder restClientBuilder,ElasticSearchProperties elasticSearchProperties){
		//配置身份验证
		final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
		credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticSearchProperties.getUser(), elasticSearchProperties.getPassword()));
		restClientBuilder.setHttpClientConfigCallback(
				httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
		//设置连接超时和套接字超时
		restClientBuilder.setRequestConfigCallback(
				requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(10000).setConnectTimeout(60000));
		//配置HTTP异步请求ES的线程数
		restClientBuilder.setHttpClientConfigCallback(
				httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig(
						IOReactorConfig.custom().setIoThreadCount(1).build()));
		//设置监听器，每次节点失败都可以监听到，可以作额外处理
		restClientBuilder.setFailureListener(new RestClient.FailureListener() {
			@Override
			public void onFailure(Node node) {
				super.onFailure(node);
				if (log.isErrorEnabled())
					log.error(node.getHost() + "--->该节点失败了");
			}
		});
		return new RestHighLevelClient(restClientBuilder);
	}

	@ConditionalOnMissingBean(value = ResetElasticSearchClient.class)
	@Bean
	public ResetElasticSearchClient resetElasticSearchClient(RestHighLevelClient client){
		return new ResetElasticSearchClient(client);
	}

	private HttpHost makeHttpHost(String str) {
		assert StringUtils.isNotEmpty(str);
		String[] address = str.split(":");
		if (address.length == ADDRESS_LENGTH) {
			String ip = address[0];
			int port = Integer.parseInt(address[1]);
			log.info("ES连接ip和port:{},{}", ip, port);
			return new HttpHost(ip, port, HTTP_SCHEME);
		} else {
			log.error("传入的ip参数不正确！");
			return null;
		}
	}
}
